Adaptive filter scheduling in the Parquet decoder (replaces PR #9)#11
Adaptive filter scheduling in the Parquet decoder (replaces PR #9)#11
Conversation
|
run benchmark clickbench_partitioned baseline:
ref: main
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: false
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: false
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true |
Replaces PR #9's morsel-per-row-group split with in-decoder strategy swap. One `ParquetPushDecoder` per file, one `BoxStream` per file, filter placement re-evaluated at every row-group boundary using the shared `SelectivityTracker`. # What's removed (vs PR #9) - The chunk loop (`ParquetAccessPlan::split_into_chunks`, `Vec<BoxStream>` returns from `build_stream`). - Per-chunk `AsyncFileReader::create_reader` minting and per-chunk `RowFilter` rebuild. - The `EarlyStoppingStream`-on-chunk-0-only special case for the non-`Clone` `FilePruner`. - `LazyMorselShared` per-morsel Arc churn — the source of the ~10% aggregate ClickBench regression you flagged in PR #9 review. # What's added `AdaptiveParquetStream` (new in `opener.rs`) drives one row group at a time via `try_next_reader`: 1. Pull a `ParquetRecordBatchReader` for the next row group. 2. Iterate it synchronously; each batch goes through any post-scan filters (which feed per-filter stats into the tracker) and then through the projector. 3. When the reader exhausts, ask the tracker to re-partition filters based on accumulated stats. If the row-filter set changed, build a new `RowFilter` and call the new arrow-rs `ParquetPushDecoder::swap_strategy` before requesting the next reader. Post-scan filters update in lockstep. `PushBuffers` carries through the swap so already-fetched bytes are preserved, and the optional-filter mid-stream skip mechanism (existing `OptionalFilterPhysicalExpr` + `tracker.is_filter_skipped`) keeps working unchanged inside `apply_post_scan_filters_with_stats`. # Carried-over machinery (file-level checkout from `dbcf5ac1e`) - `selectivity.rs` — `SelectivityTracker`, `PartitionedFilters`, `FilterId`, Welford CI bounds. Verbatim. - `row_filter.rs` — new `build_row_filter` signature returning `(Option<RowFilter>, UnbuildableFilters)` plus `total_compressed_bytes`, plus `DatafusionArrowPredicate` stat hooks. - `physical_expr.rs` — `OptionalFilterPhysicalExpr`, `snapshot_generation` helpers. `Display` is **pass-through** here (PR #9 used `Optional(...)`), keeping every existing sqllogictest expected output intact. - `config.rs` — adds `filter_pushdown_min_bytes_per_sec` / `filter_collecting_byte_ratio_threshold` / `filter_confidence_z`. **`reorder_filters` is preserved as a deprecated no-op** (per request) — the adaptive tracker subsumes it. - `selectivity_tracker.rs` bench — verbatim. - Per-file plumbing in `source.rs`: `predicate_conjuncts: Vec<(FilterId, Arc<PhysicalExpr>)>` instead of a single AND-ed predicate so per-conjunct stats accumulate across files. # arrow-rs companion branch Depends on `pydantic/arrow-rs:adaptive-strategy-swap`, which adds `ParquetPushDecoder::can_swap_strategy()` / `swap_strategy(StrategySwap)` and the `StrategySwap` builder. The `Cargo.toml` `[patch.crates-io]` block points at it. # What's not in this PR (deferred) - Sub-row-group adaptation (would need a `ParquetRecordBatchReader::pause` primitive in arrow-rs to yield a residual `RowSelection`); useful for TPCDS-style single-huge-row-group files. Defer. - Three new config knobs aren't in the proto schema yet; `from_proto` fills with config defaults so a roundtrip preserves behavior. # Tests - `cargo test -p datafusion-datasource-parquet --lib` — 143 passed - `cargo test -p datafusion --lib` — 410 passed - `cargo test -p datafusion --test core_integration` — 935 passed - `cargo test -p datafusion-sqllogictest --test sqllogictests` — all pass except `encrypted_parquet.slt` (pre-existing on upstream/main, not related to this change) Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
- Fix 6 broken intra-doc links in `opener.rs`: `RowFilter`, `PushBuffers`, `AsyncFileReader::create_reader`, `SelectivityTracker` weren't visible from the doc-comment scope. Reword to plain backticks for the names that don't have a stable in-scope path; route `SelectivityTracker` through `crate::selectivity::SelectivityTracker`. - Regenerate `docs/source/user-guide/configs.md` via `dev/update_config_docs.sh` to surface the three new `filter_pushdown_min_bytes_per_sec` / `filter_collecting_byte_ratio_threshold` / `filter_confidence_z` rows the CI doc check expects. Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
…33dd62 Picks up the rustdoc fix from the arrow-rs companion branch so the DataFusion CI doc job resolves clean too. Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
4a4e300 to
d379196
Compare
The example asserts `pushdown_rows_pruned=1` to demonstrate that the row-filter path actually evicts rows. Under the adaptive scheduler's default `filter_pushdown_min_bytes_per_sec = 100 MB/s`, a small example file's filter starts on the post-scan path (where `pushdown_rows_pruned` stays 0) and the assertion fires. Set `filter_pushdown_min_bytes_per_sec = 0` to disable the throughput check and force every filter to row-level — the same lever `physical_plan/parquet.rs` test harness uses. Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
d379196 to
88ab545
Compare
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
run benchmarks baseline:
ref: main
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true |
|
run benchmarks baseline:
ref: main
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: false
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: false
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: false
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: false |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (04e7aab) to main diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (04e7aab) to main diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (04e7aab) to main diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (04e7aab) to main diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (04e7aab) to main diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (04e7aab) to main diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
Adds a `LimitedBatchCoalescer` to `AdaptiveParquetStream`'s post-scan filter path, mirroring `FilterExec`'s behavior. Without this, inline post-scan filtering yields tiny batches (1-100 rows each on selective predicates) directly to TopK, which delays the dynamic filter from tightening: TopK only progressively improves its threshold one small batch at a time, while `FilterExec`'s coalescer ensures the first batch to TopK already contains thousands of survivors and lets TopK pick a near-optimal top-K threshold in one shot. Symptom this fixes: on `Q26` (`SELECT SearchPhrase FROM hits WHERE SearchPhrase <> '' ORDER BY EventTime LIMIT 10`) at 12 partitions, branch matches 33-34 file ranges vs main+pushdown=false's 28. With the coalescer, branch matches 30-32 — closing ~1/3 of the gap. The remaining ~2-pruning difference is unexplained but small. Coalescer params match `FilterExec`: target_batch_size from session, biggest_coalesce_batch_size = target/2 (set inside `LimitedBatchCoalescer::new`). Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
|
run benchmark clickbench_partitioned baseline:
ref: main
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: false
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: false
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (d146ebe) to main diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
run benchmark clickbench_partitioned baseline:
ref: main
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: false
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: false
changed:
ref: 04e7aab88
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: trueBenchmark 04e7aab (no coalescer, no prune_rate gate) to isolate coalescer impact |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing 04e7aab (04e7aab) to main diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
This reverts commit d146ebe.
Summary
Replaces PR #9's morsel-per-row-group split with in-decoder strategy swap: one
ParquetPushDecoderper file, oneBoxStreamper file, filter placement re-evaluated at every row-group boundary using the sharedSelectivityTracker.Filters can now adapt mid-stream (between row groups) without splitting files into chunks. The arrow-rs companion change adds a small
ParquetPushDecoder::swap_strategyAPI; the DataFusion side uses it from a single adaptive stream wrapper.arrow-rs companion branch
This PR depends on
pydantic/arrow-rsbranchadaptive-strategy-swap(CI green at pydantic/arrow-rs#9), referenced via[patch.crates-io]in the workspaceCargo.toml.The arrow-rs additions:
pub fn can_swap_strategy(&self) -> bool— true between row groups.pub fn swap_strategy(&mut self, swap: StrategySwap) -> Result<()>— replaces projection / row filter / row selection policy at a row-group boundary; rejected mid-row-group.pub struct StrategySwap(#[non_exhaustive]) with builder methods.pub fn row_groups_remaining(&self) -> usizefor diagnostics.PushBufferscarries through the swap, so bytes already fetched for columns that survive the new strategy are reused.What's removed (vs PR #9)
ParquetAccessPlan::split_into_chunks,Vec<BoxStream>returns frombuild_stream).AsyncFileReader::create_readerminting and per-chunkRowFilterrebuild (RowFilteris!Clone).EarlyStoppingStream-on-chunk-0-only special case for the non-CloneFilePruner.LazyMorselSharedper-morsel Arc churn — the source of the ~10% aggregate ClickBench regression flagged in PR Adaptive filter scheduling + row-group morsel split #9 review.What's added
AdaptiveParquetStreaminopener.rsdrives one row group at a time viatry_next_reader:ParquetRecordBatchReaderfor the next row group.RowFilterand calldecoder.swap_strategy(...)before requesting the next reader. Post-scan filters update in lockstep.PushBufferscarries through the swap so already-fetched bytes are preserved. The optional-filter mid-stream skip mechanism (existingOptionalFilterPhysicalExpr+tracker.is_filter_skipped) keeps working unchanged insideapply_post_scan_filters_with_stats.Carried-over machinery (file-level checkout from
dbcf5ac1e)selectivity.rs—SelectivityTracker,PartitionedFilters,FilterId, Welford CI bounds.row_filter.rs— newbuild_row_filtersignature returning(Option<RowFilter>, UnbuildableFilters)plustotal_compressed_bytes, plusDatafusionArrowPredicatestat hooks.physical_expr.rs—OptionalFilterPhysicalExpr,snapshot_generationhelpers.Displayis pass-through here (PR Adaptive filter scheduling + row-group morsel split #9 usedOptional(...)), keeping every existing sqllogictest expected output intact.config.rs— addsfilter_pushdown_min_bytes_per_sec/filter_collecting_byte_ratio_threshold/filter_confidence_z.reorder_filtersis preserved as a deprecated no-op — the adaptive tracker subsumes it.source.rs:predicate_conjuncts: Vec<(FilterId, Arc<PhysicalExpr>)>instead of a single AND-ed predicate so per-conjunct stats accumulate across files.Deferred
ParquetRecordBatchReader::pausein arrow-rs to yield a residualRowSelection). Useful for TPCDS-style single-huge-row-group files. Out of scope here.from_protofills with config defaults so a roundtrip preserves behavior. Worth a follow-up to plumb them through the proto.Known pre-existing CI flake
datafusion/sqllogictest/test_files/explain_analyze.slt:103(theoutput_rows_skewskew metric test) is failing onapache/datafusionmainitself — see run 25027102370 on commit310dd5d4, identical diffexpected 84.31% / actual 100%. Not introduced by this branch. Fixing it is out of scope; this PR matches the pre-existing CI baseline.Test plan
cargo fmt --allcargo clippy --workspace --all-targets ... -- -D warningscargo test -p datafusion-datasource-parquet --lib— 143 passedcargo test -p datafusion --lib— 410 passedcargo test -p datafusion --test core_integration— 935 passedcargo doc --workspace --no-deps— cleancargo run --example data_io—json_shreddingpasses withpushdown_rows_pruned=1cargo test -p datafusion-sqllogictest --test sqllogictests— pass except the pre-existingexplain_analyze.sltandencrypted_parquet.sltflakes that fail onapache/main608f280c4/32195d6da/4a4e300eb)LazyMorselSharedchurn disappearssmall_table JOIN large_tablewithWHERE small_table.v >= 50): unchanged from main🤖 Generated with Claude Code